/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.After;
import org.junit.Test;


public class ScheduledThreadPoolExecutorWithKeepAliveJUnitTest {

  ScheduledThreadPoolExecutorWithKeepAlive ex;

  @After
  public void tearDown() throws Exception {
    ex.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
    ex.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    ex.shutdownNow();
    assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
  }

  @Test
  public void testFuture() throws InterruptedException, ExecutionException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(5, 60, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);
    final AtomicBoolean done = new AtomicBoolean();
    Future f = ex.submit(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
          fail("interrupted");
        }
        done.set(true);
      }
    });
    f.get();
    assertTrue("Task did not complete", done.get());

    Thread.sleep(2000); // let the thread finish with the task

    f = ex.submit(new Callable() {
      @Override
      public Object call() {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
        return Boolean.TRUE;
      }
    });
    assertTrue("Task did not complete", ((Boolean) f.get()).booleanValue());

    assertEquals(2, ex.getLargestPoolSize());
  }

  @Test
  public void testConcurrentExecutionAndExpiration()
      throws InterruptedException, ExecutionException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);

    Runnable waitForABit = new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
      }
    };

    Future[] futures = new Future[50];
    for (int i = 0; i < 50; i++) {
      futures[i] = ex.submit(waitForABit);
    }
    long start = System.nanoTime();

    for (int i = 0; i < 50; i++) {
      futures[i].get();
    }
    long end = System.nanoTime();

    assertTrue("Tasks executed in parallel", TimeUnit.NANOSECONDS.toSeconds(end - start) < 50);

    assertEquals(50, ex.getLargestPoolSize());

    // now make sure we expire back down.
    Thread.sleep(5000);
    assertEquals(1, ex.getPoolSize());
  }

  @Test
  public void testConcurrentRepeatedTasks() throws InterruptedException, ExecutionException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);

    final AtomicInteger counter = new AtomicInteger();
    Runnable waitForABit = new Runnable() {
      @Override
      public void run() {
        try {
          counter.incrementAndGet();
          Thread.sleep(500);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
      }
    };

    Future[] futures = new Future[50];
    for (int i = 0; i < 50; i++) {
      futures[i] = ex.scheduleAtFixedRate(waitForABit, 0, 1, TimeUnit.SECONDS);
    }

    Thread.sleep(10000);

    for (int i = 0; i < 50; i++) {
      futures[i].cancel(true);
    }

    System.err.println("Counter = " + counter);
    assertTrue("Tasks did not execute in parallel. Expected more than 300 executions, got "
        + counter.get(), counter.get() > 300);

    assertEquals(50, ex.getLargestPoolSize());

    // now make sure we expire back down.
    Thread.sleep(5000);
    assertEquals(1, ex.getPoolSize());
  }

  /**
   * time, in nanoseconds, that we should tolerate as slop (evidently needed for windows)
   */
  private static final long SLOP = TimeUnit.MILLISECONDS.toNanos(20);

  @Test
  public void testDelayedExcecution() throws InterruptedException, ExecutionException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);
    long start = System.nanoTime();
    Future f = ex.schedule(new Runnable() {
      @Override
      public void run() {}
    }, 10, TimeUnit.SECONDS);
    f.get();
    long end = System.nanoTime();
    assertTrue("Execution was not delayed 10 seconds, only " + (end - start),
        TimeUnit.SECONDS.toNanos(10) <= end - start + SLOP);
  }

  @Test
  public void testRepeatedExecution() throws InterruptedException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);

    final AtomicInteger counter = new AtomicInteger();
    Runnable run = new Runnable() {
      @Override
      public void run() {
        counter.incrementAndGet();
      }
    };
    ScheduledFuture f = ex.scheduleAtFixedRate(run, 0, 1, TimeUnit.SECONDS);
    await()
        .untilAsserted(
            () -> assertEquals("Task was not executed repeatedly", true, counter.get() > 1));
    await()
        .untilAsserted(() -> assertEquals("The task could not be cancelled", true, f.cancel(true)));
    await()
        .untilAsserted(
            () -> assertEquals("Task was not cancelled within 30 sec", true, f.isCancelled()));
    int oldValue = counter.get();
    Thread.sleep(5000);
    assertEquals("Task was not cancelled", oldValue, counter.get());
  }

  @Test
  public void testShutdown() throws InterruptedException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          // change to sleep 3 seconds, the same as testShutdown2, to avoid not enough SLOP time
          Thread.sleep(3000);
          System.out.println("Finished scheduled task");
        } catch (InterruptedException e) {
          fail("interrupted");
        }
      }
    }, 2, TimeUnit.SECONDS);
    long start = System.nanoTime();
    ex.shutdown();
    assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
    long elapsed = System.nanoTime() - start;
    assertTrue("Shutdown did not wait for task to complete. Only waited "
        + TimeUnit.NANOSECONDS.toMillis(elapsed), TimeUnit.SECONDS.toNanos(4) < elapsed + SLOP);
  }

  @Test
  public void testShutdown2() throws InterruptedException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);
    ex.submit(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(3000);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
      }
    });
    // give it a chance to get in the worker pool
    Thread.sleep(500);
    ex.shutdown();
    long start = System.nanoTime();
    assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
    long elapsed = System.nanoTime() - start;
    assertTrue("Shutdown did not wait to task to complete. Only waited "
        + TimeUnit.NANOSECONDS.toMillis(elapsed), TimeUnit.SECONDS.toNanos(2) < elapsed);
  }

  @Test
  public void testShutdownNow() throws InterruptedException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(2000);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
      }
    }, 2, TimeUnit.SECONDS);
    ex.shutdownNow();
    long start = System.nanoTime();
    assertTrue(ex.awaitTermination(1, TimeUnit.SECONDS));
    long elapsed = System.nanoTime() - start;
    assertTrue(
        "ShutdownNow should not have waited. Waited " + TimeUnit.NANOSECONDS.toMillis(elapsed),
        TimeUnit.SECONDS.toNanos(2) > elapsed);
  }

  @Test
  public void testShutdownNow2() throws InterruptedException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);
    ex.submit(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(2000);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
      }
    });
    // give it a chance to get in the worker pool.
    Thread.sleep(500);
    ex.shutdownNow();
    long start = System.nanoTime();
    assertTrue(ex.awaitTermination(1, TimeUnit.SECONDS));
    long elapsed = System.nanoTime() - start;
    assertTrue(
        "ShutdownNow should not have waited. Waited " + TimeUnit.NANOSECONDS.toMillis(elapsed),
        TimeUnit.SECONDS.toNanos(2) > elapsed);
  }

  @Test
  public void testShutdownDelayedTasks() throws InterruptedException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);
    ex.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(2000);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
      }
    }, 5000, TimeUnit.MILLISECONDS);
    ex.shutdown();
    long start = System.nanoTime();
    assertTrue(ex.awaitTermination(30, TimeUnit.SECONDS));
    long elapsed = System.nanoTime() - start;
    assertTrue("Shutdown should not have waited. Waited " + TimeUnit.NANOSECONDS.toMillis(elapsed),
        TimeUnit.SECONDS.toNanos(2) > elapsed);
  }

  @Test
  public void testAllWorkersActive() throws InterruptedException {
    ex = new ScheduledThreadPoolExecutorWithKeepAlive(6, 1, TimeUnit.SECONDS,
        Executors.defaultThreadFactory(), null);
    final AtomicInteger counter = new AtomicInteger();

    long start = System.nanoTime();
    for (int i = 0; i < 100; i++) {
      ex.submit(new Runnable() {
        @Override
        public void run() {
          try {
            Thread.sleep(500);
            counter.incrementAndGet();
          } catch (InterruptedException e) {
            fail("interrupted");
          }
        }
      });
    }

    long elapsed = System.nanoTime() - start;
    assertTrue("calling ex.submit blocked the caller", TimeUnit.SECONDS.toNanos(1) > elapsed);

    Thread.sleep(20 * 500 + 1000);

    assertEquals(100, counter.get());
    assertEquals(6, ex.getMaximumPoolSize());
  }
}
